AWS SDK for pandas (awswrangler)のredshift.to_sqlの利用ポイント
データアナリティクス事業本部のueharaです。
今回は、AWS SDK for pandas (awswrangler)の redshift.to_sql
関数の利用ポイントを紹介したいと思います。
AWS SDK for pandas (awswrangler)とは
awswranglerはAWSの様々なDBサービスとpandasを連携することができるAWS公式のPythonライブラリです。
例えば、以下のようなサービスとの連携に対応しています。
- S3
- RDS
- Athena
- Redshift
- Timestream
- などなど
awswranglerを利用することで、上記AWSサービスに対してpandasのDataFrameから簡単にデータのロード/アンロードすることができます。
Lambdaで利用したいのであればAWSがGithubでLambda Layers用のzipファイルを公開しておりますし、Glue Python Shellで利用したいのであればlibrary-setにanalyticsを指定するだけで良いので簡単に利用ができます。
redshift.to_sql関数とは
redshift.to_sql
関数は、一言で言うとpandas DataFrameからRedshiftにデータを入れるためのawswranglerに用意されている関数になります。
ドキュメントもこちらのページにあるのですが、細かい仕様については触れられていない部分も多々あり、ライブラリのソースコードを直接見に行くことも多いので今回はこちらの関数の利用ポイントをいくつかピックアップして紹介したいと思います。
なお、以降の記載は2023/09/08時点で最新の「AWS SDK for pandas 3.3.0」のソースコードを元にしていますので予めご了承下さい。
ポイント①:3つのデータ挿入モード
データの挿入には append, overwrite, upsert の3つのモードがあり、mode
引数で渡すことができます。
appendはいわゆる「INSERT」、overwriteは「洗い替え(全件削除&INSERT)」、upsertは文字通り「UPSERT(UPDATE&INSERT)」となります。
overwriteについては更に overwrite_method
でテーブルの削除方法を指定できます。指定できる方法と、特徴は以下の通りです。
方法 | 特徴 |
---|---|
drop | テーブルを削除します。ただし、テーブルに依存しているビューがある場合は失敗します。 |
cascade | テーブル、およびそのテーブルに依存しているビューを全て削除します。 |
truncate | トランザクションを即座にコミットする削除方法です。データ挿入時には新しいトランザクションを開始するため、DB用語でいう原子性(Atomicity)は担保されません。 |
delete | データを削除するという点では他の3つと同じですが、 DELETE FROM ... コマンドにより全ての行を削除する操作になります。テーブル定義自体は保持されますが、他の方法に比べて処理速度は遅くなります。 |
truncate以外であれば、削除⇒挿入の流れで処理が失敗した場合、ロールバックする仕様になっています。
したがって、洗い替えの際「挿入データとして渡したDataFrameのカラム数が挿入先のテーブル定義と異なっていた」というようなデータ挿入エラーが発生したとしても、元のテーブルが削除されてしまい件数が0件になってしまうことはありません。
また、UPSERTの場合は更新キーを primary_keys
として渡すことができます。
仮にここで引数として渡さなかった場合、挿入先のテーブルからPRIMARY KEYを取得し、そのキーを元にUPSERTを実施する仕様となっています。明示的に記載したい場合は primary_kyes 引数に渡すと良いと思います。
洗い替えとUPSERTのそれぞれの例をコードで表すと、以下のようになります。
# 洗い替えの例 wr.redshift.to_sql( df=df, table="test_table", schema="public", con=con, mode="overwrite", overwrite_method="delete" ) # UPSERTの例 wr.redshift.to_sql( df=df, table="test_table", schema="public", con=con, mode="upsert", primary_keys = ["pk1", "pk2"] )
ポイント②:挿入先のテーブルがDBに存在しない場合
挿入先のテーブルが存在しない場合、テーブルが新規作成されます。
このとき、定義される型はDataFrameのdtypeに基づくものになります。変換を実施している部分のソースコードは以下の通りです。
例えば、DataFrameの型がuint32
であれば、テーブルはBIGINT
で定義されます。
dtypeがNULLなど変換できない値であった場合は、以下のようなエラーとなります。
UnsupportedType: Unsupported Redshift type: null
作成されるテーブルの型を明示的に指定したい場合は、 redshift.to_sql
に dtype
を引数として渡すことができます。
wr.redshift.to_sql( df=df, table="test_table", schema="public", con=con, mode="overwrite", dtype={"col1_name": "VARCHAR(10)", "col2_name": "FLOAT"}) )
dtypeを指定することにより、例えば TIMESTAMPTZ
など、変換表にない型も定義することができます。
ただ、基本的には予めDDLを用意してテーブルを作成し、洗い替えの際削除は delete
で実施するという方針が運用上管理がしやすいのではないかと思います。
ポイント③:データ挿入に関する内部処理
データの挿入は、当然と言えば当然ですが内部的には INSERT
クエリを作成して実施されます。
例えば、DataFrameが以下だったとします。
df = pd.DataFrame({ "col1": ["hoge1", np.NaN, "hoge3", "hoge4"], "col2": ["foo1", "foo2", "foo3", "foo4"], "col3": [np.NaN, "x", "y", "z"], "col4": [1, 2, 3, 4] })
このとき、1行目のデータを挿入するのに作成されるSQLは以下の通りとなります。
INSERT INTO public.test_table VALUES ('hoge1', 'foo1', NULL, 1)
ちなみに、 redshift.to_sql
関数の use_column_names
という引数を True
にした場合は以下のようになります。
INSERT INTO public.test_table (col1, col2, col3, col4) VALUES ('hoge1', 'foo1', NULL, 1)
上記のように、単純にデータの値をINSERTクエリにVALUESとして渡しているだけなので、データの挿入時には特にDataFrameの型を気にしてはいません。
したがって、以下のDataFrameのcol1のdtypeはStringになりますが、テーブル定義の方が TIMESTAMPTZ
であれば TIMESTAMPTZ
型で入ります。
""" # テーブル定義 CREATE TABLE IF NOT EXISTS public.test_table ( col1 TIMESTAMPTZ ); """ df = pd.DataFrame({ "col1": ["2023-09-08 09:30:00 JST", "2023-09-08 10:30:00 UTC"] # 上のテーブルに入れると TIMESTAMPTZ で入る })
先程説明に出た redshift.to_sql
の引数で指定するdtypeはあくまでテーブルを新規作成する際に利用されるものであり、既に元テーブルがある場合は利用されることの無い値になるのでご注意下さい。
また、まとめてINSERTするデータの行数を chunksize
引数で指定することもできます。デフォルトは200です。
まとめて入れる方が効率的ですが、その分メモリ使用量も増えるので、ユースケースによって適切な値を設定して下さい。
wr.redshift.to_sql( df=df, table="test_table", schema="public", con=con, mode="append", chunksize=500 # まとめて挿入する行数を指定 )
ポイント④:テーブル名の大文字・小文字で問題が発生するケース
Redshiftは、デフォルトではデータベースオブジェクト(テーブルや列など)の大文字・小文字を区別しません。
したがって、 CREATE TABLE
クエリでテーブル名を大文字で指定したとしても、内部では小文字のテーブル名で管理されます。
redshift.to_sql
関数において、overwriteとupsertは既存のテーブルチェックを行います。
overwriteであれば既存のテーブルが存在すれば削除、upsertであれば既存のテーブルが存在すれば一時テーブルを作成して更新・挿入の処理を行います。
既存のテーブルが存在しない場合、どちらもappend(INSERT)と同じ挙動をします。
テーブルの存在確認について、awswranglerは内部的には以下のクエリを実行しています。
# ※{schema}, {table} はそれぞれredshift.to_sql関数に引数として与えられている値 SELECT true WHERE EXISTS ( SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table}' )
ここで、元のテーブルを大文字で作成したからと、 redshift.to_sql
関数に大文字でテーブル名を渡すと、上記の存在チェックでtrueとならないため想定しない挙動となります。
具体的には、例えばoverwriteであれば元テーブルが削除されずそのままデータがINSERTされるという事象が起こります。
wr.redshift.to_sql( df=df, table="TEST_table", # 内部的にはtest_tableのため、存在チェッククエリでtrueとならない schema="public", con=con, mode="overwrite", overwrite_method="delete" )
大文字と小文字を区別するオプションもあり、それによって対応も変わりますが、デフォルトの設定で使用している場合は注意しましょう。
ポイント⑤:chunksizeを大きくしすぎたときのエラー
テーブルに挿入するカラム数が多く、かつchunksizeを大きくしすぎた時、以下のようなエラーが発生します。
error: 'h' format requires -32768 <= number <= 32767
これは以下のようなクエリ内の placeholders (%s) に関する制限であり、このplaceholdersのカウンターがshort
型であるため、このようなエラーが発生します。
INSERT INTO "public"."test_table" VALUES (%s, %s, %s)
本エラーがでた場合、chunksizeを小さくすることで利用するplaceholdersの数を少なくし、エラーを解消することができます。
ポイント⑥:INT型のテーブルにnanを含む値を入れる際の注意点
NULLを含む整数値のカラムを持つcsvファイルをpandasのDataFrameで読み込んだ場合、デフォルトでは float64
と判定されます。
例えば、下記の場合col2カラムは float64
で読み込まれます。
col1,col2,col3 1,,7 2,5,8 3,6,9
これは、DataFrameのデフォルトの整数型である int64
が nan
という値を持てないためです。
この nan
を含む float64
のカラム持つDataFrameを、RedshiftのINT型のテーブルに redshift.to_sql
でデータを挿入しようとするとエラーとなります。
invalid input syntax for integer: "nan"
対策としては、DataFrameを全て object
型にすることが考えられます。
ポイント③でお話した通り、 redshift.to_sql
関数では単にINSERTクエリのVALUESに値を渡しているだけなので文字列としてしまっても問題ありません。
そうしてしまえば INSERT クエリ発行時に nan
の部分は NULL
となるので、問題なく挿入できるようになります。
... df = df.astype(object) wr.redshift.to_sql( df=df, table="test_table", schema="public", con=con, mode="append" )
ポイント⑦:処理実行後のcommitタイミング
まず大前提として、AWS SDK for pandas (awswrangler)が裏側で利用している redshift_connector
はデフォルトで autocommit
がデフォルトで False
になっています。
また、仮に上記を True
に設定していたとしても、redshift.to_sql
関数では引数で渡されたコネクションの autocommit
設定を以下の通り参照しません。
autocommit_temp: bool = con.autocommit con.autocommit = False
redshift.to_sql
関数の commit_transaction
引数はデフォルトで True
になっているため、正常に終了が終了した後に con.commit()
が実行されます。
従って、仮にその前段で con.cursor().execute()
を実行してcommitしていないものがあっても、このタイミングでcommitされます。
commit_transaction
で False
を設定すると redshift.to_sql
関数の処理終了後にもcommitはしませんので、その方が都合が良いという場合は False
に設定して下さい。
ただし、redshift.to_sql
関数の実行が失敗した場合は(ポイント①で述べたように)truncate以外は con.rollback()
が呼ばれますので、その時点でcommitしていないcon.cursor().execute()
の実行処理分は破棄されることになります。
最後に
今回は、AWS SDK for pandas (awswrangler)の redshift.to_sql
関数の利用ポイントを紹介させていただきました。
参考になりましたら幸いです。
参考文献
- Releases · aws/aws-sdk-pandas
- AWS Glue Python shell now supports Python 3.9 with a flexible pre-loaded environment and support to install additional libraries
- awswrangler.redshift.to_sql — AWS SDK for pandas 3.3.0 documentation
- Increasing Chunk Size on awswrangler::postgres::to_sql() causes struct packing error